Многопоточное программирование в Java - Тимур Машнин
CompletableFuture
Ранее мы использовали объект Future для получения результата выполнения задачи Callable в отдельном потоке.
Java 8 представляет класс CompletableFuture, который позволяет выполнять задачи в потоке, отдельном от основного потока приложения, и уведомлять основной поток о прогрессе, завершении или сбое задачи.
Таким образом, ваш основной поток не блокируется/не ждет завершения задачи и может выполнять другие задачи параллельно.
CompletableFuture является реализацией интерфейса Future, который использовался нами как ссылка на результат асинхронного вычисления.
Он предоставлял метод isDone, чтобы проверить, выполнено ли вычисление или нет, и метод get для получения результата вычисления при его выполнении.
Однако интерфейс Future имеет некоторые недостатки.
При его использовании результат вычисления невозможно завершить вручную.
Например, вы написали функцию для получения данных из удаленного сервиса.
Так как этот вызов занимает много времени, вы запускаете его в отдельном потоке и возвращаете Future из своей функции.
Предположим теперь, что, если удаленный сервис недоступен, вы хотите завершить Future вручную с кэшированными данными.
Сделать это с помощью Future нельзя.
Далее, вы не можете выполнять дальнейшие действия с результатом, полученным из Future, без блокировки, так как Future не уведомляет вас о его завершении.
Future предоставляет метод get, который блокирует до тех пор, пока результат не будет доступен.
И у вас нет возможности подключить функцию обратного вызова к Future и получить ее вызов автоматически, когда будет получен результат в будущем.
Далее, нельзя соединить вместе несколько объектов Future.
Например, вам нужно выполнить длительное вычисление, и когда вычисление будет завершено, вам нужно отправить его результат в другое длительное вычисление и так далее.
Вы не можете создать такой асинхронный рабочий поток с помощью Future.
Далее, вы не можете комбинировать несколько объектов Future вместе.
Например, у вас есть несколько различных Future, которые вы хотите запустить параллельно, а затем выполнить некоторую функцию после того, как все они завершатся.
Вы тоже не можете это сделать с помощью интерфейса Future.
Кроме того, интерфейс Future не имеет конструкции обработки исключений.
Все эти проблемы решает класс CompletedFuture, который реализует интерфейсы Future и CompletionStage и предоставляет набор методов для создания, соединения и объединения нескольких Future. Он также поддерживает обработку исключений.
Для выполнения задачи в отдельном потоке, мы применяем статический метод supplyAsync, который принимает реализацию интерфейса Supplier и выполняет эту реализацию в потоке пула потоков ForkJoinPool.
Интерфейс Supplier имеет единственный метод get, который необходимо определить.
Если вы хотите принудительно завершить вычисление, вы вызываете метод complete объекта CompletableFuture и возвращаете результат, который вы указали в качестве аргумента метода.
Если вы хотите выполнить задачу без возврата результата, вы вызываете метод runAsync и передаете ему объект Runnable.
Также вы можете выполнить задачу в каком-либо другом пуле потоков, явно указав Executor.
Пока что мы не видим явных преимуществ перед Future, так как для получения результата мы просто вызываем блокирующий метод get.
Для создания полностью асинхронного кода, мы должны иметь возможность подключить обратный вызов к CompletableFuture, который должен автоматически вызываться при завершении вычисления.
И мы сможем написать логику, которая должна быть выполнена после завершения Future внутри нашей функции обратного вызова.
Можно присоединить обратный вызов к CompletableFuture, используя методы thenApply, thenAccept и thenRun.
Вы можете использовать метод thenApply для обработки и преобразования результата CompletableFuture, когда он поступит.
Этот метод принимает реализацию интерфейса Function как аргумент.
Интерфейс Function содержит метод, который принимает аргумент и возвращает результат.
Если вы не хотите возвращать что-либо из своей функции обратного вызова и просто хотите запустить некий код после завершения Future, вы можете использовать методы thenAccept и thenRun. Эти методы используются в качестве последнего обратного вызова в цепочке обратных вызовов.
Метод thenAccept принимает реализацию интерфейса Consumer, который имеет метод accept, принимающий аргумент и ничего не возвращающий.
Этот метод имеет доступ к результату CompletableFuture.
Метод thenRun принимает объект Runnable и не имеет доступа к результату Future.
У всех этих метод есть вариация Async, которая запускает код в отдельном потоке.
Вы также можете создать цепочку вложенных CompletableFuture.
Здесь вы сначала получаете идентификатор клиента, а затем передаете его методу thenCompose и возвращаете баланс клиента.
Теперь, если вы хотите вычислить две задачи независимо друг от друга, а затем что-то вычислить с использованием их результатов, вместо метода thenCompose нужно применить метод thenCombine.
В этом примере мы независимо вычисляет ширину и высоту, а затем вычисляем площадь.
Мы использовали методы thenCompose и thenCombine чтобы соединить и объединить два CompletableFutures вместе.
Теперь, если вы хотите совместить произвольное число CompletableFutures, для этого можно использовать методы allOf и anyOf.
Далее, если у нас появилась ошибка в цепочке вызовов, мы можем легко ее обработать, и ошибка не будет распространяться дальше в цепочке обратных вызовов.
ManagedBlocker
Фреймворк Fork/Join дает возможность использовать один общий пул потоков ForkJoinPool, который предварительно сконфигурирован JVM и используется для распараллеливания потоков и выполнения задач, заданных, например, с помощью CompletableFuture.supplyAsync и так далее.
Это звучит хорошо.
Однако то, что пул потоков ForkJoinPool является общим, означает совместное использование всеми компонентами, работающими в одном JVM-процессе.
Если вы загрязните пул потоков блокирующими или длительными задачами, вы можете остановить работу всего процесса JVM, который у вас есть.
Однако ForkJoinPool был разработан с идеей о том, что некоторые задачи могут блокировать рабочие потоки, поэтому он содержит API для обработки таких случаев блокировки.
Для этого используется интерфейс ManagedBlocker, обеспечивающий способ сообщить ForkJoinPool, что он должен расширить свой параллелизм, чтобы компенсировать потенциальные заблокированные рабочие потоки.
Интерфейс ManagedBlocker предоставляет два метода.
Метод isReleasable должен возвращать true, если блокировка не требуется.
Метод block блокирует текущий поток, если это необходимо.
Эти действия выполняются любым потоком, вызывающим метод ForkJoinPool.managedBlock.
Этот метод запускает блокирующую задачу.
При этом активируется